AWS CLIを使ってKinesis Data Streamsのレコードをシーケンス番号から取得してみた
Amazon Kinesis Data Streamsのストリームにレコードを追加すると、時間とともに単調増加するシーケンス番号が割り振られ、ポインターとしてデータが保存されます。 このようにすることで、シャード内ではレコードが時系列順に並びます。
逆に、ストリーム内のデータにアクセスする場合、シャードを指定し、イテレーターを移動させながら、シーケンシャルにレコードをアクセスします。
Producerはシャードを束ねたストリームに対して操作しますが、Consumerは特定のシャードに対して操作します。 ご注意ください。
本記事では
- シャード番号
- シーケンス番号
が与えられた時に、その実レコードを AWS CLIで取得する方法を紹介します。
Kinesisストリームを処理するワーカーにデッドレターキュー(DLQ)を設定し、エラーの原因になったレコードを確認するようなユースケースを想定しています。
デッドレターキューメッセージのサンプル
Kinesis Data StreamsをEventBridge Pipes経由でLambdaで処理し、リトライ上限に到達すると、以下の様なメッセージが DLQ に送信されます。
{ "context": { "partnerResourceArn": "arn:aws:pipes:eu-central-1:123456789012:pipe/k_worker", "condition": "RetryAttemptsExhausted" }, "version": "1.0", "timestamp": "2023-04-29T15:47:42.068Z", "KinesisBatchInfo": { "shardId": "shardId-000000000000", "startSequenceNumber": "49640293986203916304020474349387293118703111245603536898", "endSequenceNumber": "49640293986203916304020474349387293118703111245603536898", "approximateArrivalOfFirstRecord": "2023-04-29T15:46:40.681Z", "approximateArrivalOfLastRecord": "2023-04-29T15:46:40.681Z", "batchSize": 1, "streamArn": "arn:aws:kinesis:eu-central-1:123456789012:stream/dummy" } }
condition
からリトライ上限に達したことKinesisBatchInfo
の各種属性から、問題の起きたストリーム、シャード、シーケンス情報
などがわかります。
対象は1レコードのため
startSequenceNumber
endSequenceNumber
が同じです。
レコードを復元
以下の流れでレコードを復元します。
- シャードのイテレーターを取得
- レコードを取得
- レコードをデコード
AWS CLIで実際に操作してみます。
1. シャードのイテレーターを取得
まずはシャードのイテレーターを取得します。
DLQメッセージからシャード番号(shardId
)とシーケンス番号(startSequenceNumber
)がわかるため、この位置(AT_SEQUENCE_NUMBER
)のイテレーターを Kinesis:GetShardIterator API で取得します。
$ aws kinesis get-shard-iterator \ --stream-name dummy \ --shard-id shardId-000000000000 \ --starting-sequence-number 49640293986203916304020474349387293118703111245603536898 \ --shard-iterator-type AT_SEQUENCE_NUMBER { "ShardIterator": "AAAAAAAAAAFcBeKIp1K/3NLqn0Dvh6eg0HAeZqfCbBP2Ll32+6ULArVn9TwbQTp0o3oGqJ/nd5W8CiCivPC0rph6JxTJHEzdP1PnTYTHl5yZDcDMZl9sxhfVTF6km/7OoH0LQDd3L/G2x3ny0yuY8dxEI3B7kddheD3YeQ4k0j9gleYDpvDcIS2C8I2/kFk7Bf++3DTW0BV4Ey+gk+cYY7dXoapfGPoJLXTmWXCByU9X3Q7amxjWyg==" }
イテレーターの位置の指定方法は、今回紹介したシーケンス番号以外にも、
- 最古のシーケンス番号(
TRIM_HORIZON
) - 最新のシーケンス番号=新しく受け取ったレコードを処理したい(
LATEST
) - 時刻指定(
AT_TIMESTAMP
)
など、様々な指定方法があります。
2. レコードを取得
次に、イテレーターの位置にあるレコードを Kinesis:GetRecords API で取得します。
$ aws kinesis get-records \ --shard-iterator "AAAAAAAAAAG7Ge6krGEz+zKHgkvwuwXru3G7U+yGWrxQTrnHKBaqh1rrG5PBeOCjlxTjD51Un3buWuzv8TuP/5CscJtodQik5YITrD8c4QhzHNuU2kMGqgOwWq3bNyGucxNmPW3/GSMc2mR/ncaZsThlK8qjFpQCIqTfkPjFsaZjloXHhZNKDbkRylJ13Gy6jWGClv1koNKQVs7juREKJdhT3/UiMGmh3UlBz5XmF47DN09AbicsiQ==" { "Records": [ { "SequenceNumber": "49640293986203916304020474349387293118703111245603536898", "ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00", "Data": "Zm9vCg==", "PartitionKey": "1" }, { "SequenceNumber": "49640293986203916304020474349388502044522725874778243074", "ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00", "Data": "YmFyCg==", "PartitionKey": "2" } ], "NextShardIterator": "AAAAAAAAAAHxP+XECJ/Nh5l3VqaeH1tQzICjBoXBrvFWwxV00F6Nq07yXKK0aW5T66ktKoICag1XD/+oYds1ghVw5KBSc+wYUSDPVrTGRBp/RISWrjn80UtMk/h/vcv3YFKUMJj6YzZ7u5/D2yZDVIYvYKQ9Gfvxi1dS2UUjWfTAIwXmYzCGGX2VEHVMAUP91zD9EF9V7jc4GJKJL8LGnUFBH0of9EAKQF8DAGDRPMKh4o9QPJLuJQ==", "MillisBehindLatest": 825000 }
- イテレーター(
ShardIterator
/NextShardIterator
) - 2つのレコード、及びその
SequenceNumber
を図示すると、次の様になります。
ShardIterator
と NextShardIterator
の間にレコードがあり、レコードの SequenceNumber
は単調増加しています。
取得するレコード数を --limit
で指定することもできます。
$ aws kinesis get-records \ --shard-iterator "AAAAAAAAAAG7Ge6krGEz+zKHgkvwuwXru3G7U+yGWrxQTrnHKBaqh1rrG5PBeOCjlxTjD51Un3buWuzv8TuP/5CscJtodQik5YITrD8c4QhzHNuU2kMGqgOwWq3bNyGucxNmPW3/GSMc2mR/ncaZsThlK8qjFpQCIqTfkPjFsaZjloXHhZNKDbkRylJ13Gy6jWGClv1koNKQVs7juREKJdhT3/UiMGmh3UlBz5XmF47DN09AbicsiQ==" \ --limit 1 { "Records": [ { "SequenceNumber": "49640293986203916304020474349387293118703111245603536898", "ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00", "Data": "Zm9vCg==", "PartitionKey": "1" } ], "NextShardIterator": "AAAAAAAAAAGu9ytcyEWrMtXHn23JxQKROdrhWR0Dsx5ja88iIID0X5qoKv3QKByQlnkNLzyJ+VY2J9FbwZUEwKdUtryRpI52/Ha1RtNkKJxbrGyrSDTzeidQezxuXgKLq03+d50Qwl24hI7B+g4t70rfOlJkHaeD3IfofjZZm6oWtCUoUTyn/oewUSSA8pgxuP6dz0lorI9yzGVBQuF6xW0oBuN9XDlXU0FrDc+brkpWajB4jYQ2GA==", "MillisBehindLatest": 1555000 }
3. レコードをデコード
レコードは base64 エンコードされているため、デコードします。
$ echo 'Zm9vCg==' | base64 -d foo
最後に
Kinesis Data Streamsを利用する際、Lambda が登場してからは
- Lambda Event Source Mapping
- Event Bridge Pipes
等を利用することで、シャードやシーケンスを意識することなく簡単にストリーム処理できるようになりました。 とはいえ、障害発生時には、今回紹介したような手順で、シャードやシーケンスを明示的に指定してエラーの原因になったレコードを確認することも必要です。
Kinesisの実運用では、エラーの起きたレコードを DLQ SQS などに一度飛ばし、Lambda ワーカー経由でKinesisから問題のレコードを取得してSlackなどに通知すると、障害対応の初動の手間が省けると思います。
それでは。